{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/nyc-taxi-data-regression-model-building.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# NYC Taxi Data Regression Model\n",
        "This is an [Azure Machine Learning Pipelines](https://aka.ms/aml-pipelines) version of two-part tutorial ([Part 1](https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep), [Part 2](https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models)) available for Azure Machine Learning.\n",
        "\n",
        "You can combine the two part tutorial into one using AzureML Pipelines as Pipelines provide a way to stitch together various steps involved (like data preparation and training in this case) in a machine learning workflow.\n",
        "\n",
        "In this notebook, you learn how to prepare data for regression modeling by using open source library [pandas](https://pandas.pydata.org/). You run various transformations to filter and combine two different NYC taxi datasets. Once you prepare the NYC taxi data for regression modeling, then you will use [AutoMLStep](https://docs.microsoft.com/python/api/azureml-train-automl-runtime/azureml.train.automl.runtime.automl_step.automlstep?view=azure-ml-py) available with [Azure Machine Learning Pipelines](https://aka.ms/aml-pipelines) to define your machine learning goals and constraints as well as to launch the automated machine learning process. The automated machine learning technique iterates over many combinations of algorithms and hyperparameters until it finds the best model based on your criterion.\n",
        "\n",
        "After you complete building the model, you can predict the cost of a taxi trip by training a model on data features. These features include the pickup day and time, the number of passengers, and the pickup location.\n",
        "\n",
        "## Prerequisite\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prepare data for regression modeling\n",
        "First, we will prepare data for regression modeling. We will leverage the convenience of Azure Open Datasets along with the power of Azure Machine Learning service to create a regression model to predict NYC taxi fare prices. Perform `pip install azureml-opendatasets` to get the open dataset package.  The Open Datasets package contains a class representing each data source (NycTlcGreen and NycTlcYellow) to easily filter date parameters before downloading.\n",
        "\n",
        "\n",
        "### Load data\n",
        "Begin by creating a dataframe to hold the taxi data. When working in a non-Spark environment, Open Datasets only allows downloading one month of data at a time with certain classes to avoid MemoryError with large datasets. To download a year of taxi data, iteratively fetch one month at a time, and before appending it to green_df_raw, randomly sample 500 records from each month to avoid bloating the dataframe. Then preview the data. To keep this process short, we are sampling data of only 1 month.\n",
        "\n",
        "Note: Open Datasets has mirroring classes for working in Spark environments where data size and memory aren't a concern."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.opendatasets import NycTlcGreen, NycTlcYellow\n",
        "import pandas as pd\n",
        "from datetime import datetime\n",
        "from dateutil.relativedelta import relativedelta\n",
        "\n",
        "green_df_raw = pd.DataFrame([\n",
        "    [2,\"2016-01-03 21:02:35\",\"2016-01-03 21:05:52\",1,0.83,\"\",\"\",-73.98726654052734,40.6938362121582,-73.97611236572266,40.69454574584961,1,\"N\",1,4.5,0.5,0.5,0.3,0.0,0.0,\"\",5.8,1.0],\n",
        "    [2,\"2016-01-19 21:49:17\",\"2016-01-19 21:54:37\",2,1.27,\"\",\"\",-73.94845581054688,40.80146789550781,-73.95975494384766,40.81214904785156,1,\"N\",1,6.0,0.5,0.5,0.3,1.0,0.0,\"\",8.3,1.0],\n",
        "    [2,\"2016-01-05 09:46:18\",\"2016-01-05 09:57:28\",1,1.8,\"\",\"\",-73.9554443359375,40.6797981262207,-73.98030853271484,40.678741455078125,1,\"N\",1,9.5,0.0,0.5,0.3,2.06,0.0,\"\",12.36,1.0],\n",
        "    [1,\"2016-01-08 17:49:12\",\"2016-01-08 17:52:20\",1,0.5,\"\",\"\",-73.92293548583984,40.76081848144531,-73.92549896240234,40.75471496582031,1,\"N\",1,4.0,1.0,0.5,0.3,1.15,0.0,\"\",6.95,1.0],\n",
        "    [1,\"2016-01-29 10:28:21\",\"2016-01-29 10:34:59\",1,0.9,\"\",\"\",-73.92304229736328,40.664939880371094,-73.91104125976562,40.66966247558594,1,\"N\",2,6.0,0.0,0.5,0.3,0.0,0.0,\"\",6.8,1.0],\n",
        "    ] * 50, columns=[\"vendorID\",\"lpepPickupDatetime\",\"lpepDropoffDatetime\",\"passengerCount\",\"tripDistance\",\"puLocationId\",\"doLocationId\",\"pickupLongitude\",\"pickupLatitude\",\"dropoffLongitude\",\"dropoffLatitude\",\"rateCodeID\",\"storeAndFwdFlag\",\"paymentType\",\"fareAmount\",\"extra\",\"mtaTax\",\"improvementSurcharge\",\"tipAmount\",\"tollsAmount\",\"ehailFee\",\"totalAmount\",\"tripType\"])\n",
        "start = datetime.strptime(\"1/1/2016\",\"%m/%d/%Y\")\n",
        "end = datetime.strptime(\"1/31/2016\",\"%m/%d/%Y\")\n",
        "\n",
        "number_of_months = 1\n",
        "sample_size = 5000\n",
        "\n",
        "for sample_month in range(number_of_months):\n",
        "    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \\\n",
        "        .to_pandas_dataframe()\n",
        "    if temp_df_green is None:\n",
        "        continue\n",
        "    green_df_raw = green_df_raw.append(temp_df_green.sample(sample_size))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "yellow_df_raw = pd.DataFrame([\n",
        "    [2,\"2016-01-06 12:09:13\",\"2016-01-06 12:22:14\",1,2.09,\"\",\"\",-73.98207092285156,40.74605941772461,-74.00462341308594,40.730628967285156,1,\"N\",1,10.5,0.0,0.5,0.3,2.26,0.0,13.56],\n",
        "    [1,\"2016-01-03 17:57:48\",\"2016-01-03 18:08:18\",3,1.5,\"\",\"\",-73.96627044677734,40.764835357666016,-73.98455047607422,40.75786209106445,1,\"N\",2,8.5,1.0,0.5,0.3,0.0,0.0,10.3],\n",
        "    [1,\"2016-01-18 07:37:51\",\"2016-01-18 07:47:01\",1,1.8,\"\",\"\",0.0,0.0,0.0,0.0,1,\"N\",1,8.5,0.0,0.5,0.3,1.85,0.0,11.15],\n",
        "    [2,\"2016-01-26 00:31:36\",\"2016-01-26 00:38:47\",1,1.96,\"\",\"\",-73.9906234741211,40.7553596496582,-73.97895812988281,40.78070831298828,1,\"N\",1,8.0,0.5,0.5,0.3,1.0,0.0,10.3],\n",
        "    [2,\"2016-01-20 23:37:22\",\"2016-01-20 23:51:09\",1,3.6,\"\",\"\",-73.98528289794922,40.76026153564453,-74.01127624511719,40.7148323059082,1,\"N\",1,13.5,0.5,0.5,0.3,2.5,0.0,17.3]\n",
        "    ] * 50, columns=[\"vendorID\",\"tpepPickupDateTime\",\"tpepDropoffDateTime\",\"passengerCount\",\"tripDistance\",\"puLocationId\",\"doLocationId\",\"startLon\",\"startLat\",\"endLon\",\"endLat\",\"rateCodeID\",\"storeAndFwdFlag\",\"paymentType\",\"fareAmount\",\"extra\",\"mtaTax\",\"improvementSurcharge\",\"tipAmount\",\"tollsAmount\",\"totalAmount\"])\n",
        "start = datetime.strptime(\"1/1/2016\",\"%m/%d/%Y\")\n",
        "end = datetime.strptime(\"1/31/2016\",\"%m/%d/%Y\")\n",
        "\n",
        "sample_size = 500\n",
        "\n",
        "for sample_month in range(number_of_months):\n",
        "    temp_df_yellow = NycTlcYellow(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \\\n",
        "        .to_pandas_dataframe()\n",
        "    if temp_df_yellow is None:\n",
        "        continue\n",
        "    yellow_df_raw = yellow_df_raw.append(temp_df_yellow.sample(sample_size))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### See the data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from IPython.display import display\n",
        "\n",
        "display(green_df_raw.head(5))\n",
        "display(yellow_df_raw.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Download data locally and then upload to Azure Blob\n",
        "This is a one-time process to save the dave in the default datastore. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "dataDir = \"data\"\n",
        "\n",
        "if not os.path.exists(dataDir):\n",
        "    os.mkdir(dataDir)\n",
        "\n",
        "greenDir = dataDir + \"/green\"\n",
        "yelloDir = dataDir + \"/yellow\"\n",
        "\n",
        "if not os.path.exists(greenDir):\n",
        "    os.mkdir(greenDir)\n",
        "    \n",
        "if not os.path.exists(yelloDir):\n",
        "    os.mkdir(yelloDir)\n",
        "    \n",
        "greenTaxiData = greenDir + \"/unprepared.parquet\"\n",
        "yellowTaxiData = yelloDir + \"/unprepared.parquet\"\n",
        "\n",
        "green_df_raw.to_csv(greenTaxiData, index=False)\n",
        "yellow_df_raw.to_csv(yellowTaxiData, index=False)\n",
        "\n",
        "print(\"Data written to local folder.\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Workspace\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print(\"Workspace: \" + ws.name, \"Region: \" + ws.location, sep = '\\n')\n",
        "\n",
        "# Default datastore\n",
        "default_store = ws.get_default_datastore() \n",
        "\n",
        "default_store.upload_files([greenTaxiData], \n",
        "                           target_path = 'green', \n",
        "                           overwrite = True, \n",
        "                           show_progress = True)\n",
        "\n",
        "default_store.upload_files([yellowTaxiData], \n",
        "                           target_path = 'yellow', \n",
        "                           overwrite = True, \n",
        "                           show_progress = True)\n",
        "\n",
        "print(\"Upload calls completed.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create and register datasets\n",
        "\n",
        "By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. You can learn more about the what subsetting capabilities are supported by referring to [our documentation](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabular_dataset.tabulardataset?view=azure-ml-py#remarks). The data remains in its existing location, so no extra storage cost is incurred."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Dataset\n",
        "green_taxi_data = Dataset.Tabular.from_delimited_files(default_store.path('green/unprepared.parquet'))\n",
        "yellow_taxi_data = Dataset.Tabular.from_delimited_files(default_store.path('yellow/unprepared.parquet'))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Register the taxi datasets with the workspace so that you can reuse them in other experiments or share with your colleagues who have access to your workspace."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "green_taxi_data = green_taxi_data.register(ws, 'green_taxi_data')\n",
        "yellow_taxi_data = yellow_taxi_data.register(ws, 'yellow_taxi_data')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Setup Compute\n",
        "#### Create new or use an existing compute\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import ComputeTarget, AmlCompute\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "\n",
        "# Choose a name for your CPU cluster\n",
        "amlcompute_cluster_name = \"cpu-cluster\"\n",
        "\n",
        "# Verify that cluster does not exist already\n",
        "try:\n",
        "    aml_compute = ComputeTarget(workspace=ws, name=amlcompute_cluster_name)\n",
        "    print('Found existing cluster, use it.')\n",
        "except ComputeTargetException:\n",
        "    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',\n",
        "                                                           max_nodes=4)\n",
        "    aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)\n",
        "\n",
        "aml_compute.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define RunConfig for the compute\n",
        "We will also use `pandas`, `scikit-learn` and `automl`, `pyarrow` for the pipeline steps. Defining the `runconfig` for that."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.runconfig import RunConfiguration\n",
        "from azureml.core.conda_dependencies import CondaDependencies\n",
        "\n",
        "# Create a new runconfig object\n",
        "aml_run_config = RunConfiguration()\n",
        "\n",
        "# Use the aml_compute you created above. \n",
        "aml_run_config.target = aml_compute\n",
        "\n",
        "# Enable Docker\n",
        "aml_run_config.environment.docker.enabled = True\n",
        "\n",
        "# Use conda_dependencies.yml to create a conda environment in the Docker image for execution\n",
        "aml_run_config.environment.python.user_managed_dependencies = False\n",
        "\n",
        "# Specify CondaDependencies obj, add necessary packages\n",
        "aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(\n",
        "    conda_packages=['pandas','scikit-learn'], \n",
        "    pip_packages=['azureml-sdk[automl]', 'pyarrow==14.0.2'])\n",
        "\n",
        "print (\"Run configuration created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Prepare data\n",
        "Now we will prepare for regression modeling by using `pandas`. We run various transformations to filter and combine two different NYC taxi datasets.\n",
        "\n",
        "We achieve this by creating a separate step for each transformation as this allows us to reuse the steps and saves us from running all over again in case of any change. We will keep data preparation scripts in one subfolder and training scripts in another.\n",
        "\n",
        "> The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define Useful Columns\n",
        "Here we are defining a set of \"useful\" columns for both Green and Yellow taxi data."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "display(green_df_raw.columns)\n",
        "display(yellow_df_raw.columns)\n",
        "\n",
        "# useful columns needed for the Azure Machine Learning NYC Taxi tutorial\n",
        "useful_columns = str([\"cost\", \"distance\", \"dropoff_datetime\", \"dropoff_latitude\", \n",
        "                      \"dropoff_longitude\", \"passengers\", \"pickup_datetime\", \n",
        "                      \"pickup_latitude\", \"pickup_longitude\", \"store_forward\", \"vendor\"]).replace(\",\", \";\")\n",
        "\n",
        "print(\"Useful columns defined.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Cleanse Green taxi data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineData\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "\n",
        "# python scripts folder\n",
        "prepare_data_folder = './scripts/prepdata'\n",
        "\n",
        "# rename columns as per Azure Machine Learning NYC Taxi tutorial\n",
        "green_columns = str({ \n",
        "    \"vendorID\": \"vendor\",\n",
        "    \"lpepPickupDatetime\": \"pickup_datetime\",\n",
        "    \"lpepDropoffDatetime\": \"dropoff_datetime\",\n",
        "    \"storeAndFwdFlag\": \"store_forward\",\n",
        "    \"pickupLongitude\": \"pickup_longitude\",\n",
        "    \"pickupLatitude\": \"pickup_latitude\",\n",
        "    \"dropoffLongitude\": \"dropoff_longitude\",\n",
        "    \"dropoffLatitude\": \"dropoff_latitude\",\n",
        "    \"passengerCount\": \"passengers\",\n",
        "    \"fareAmount\": \"cost\",\n",
        "    \"tripDistance\": \"distance\"\n",
        "}).replace(\",\", \";\")\n",
        "\n",
        "# Define output after cleansing step\n",
        "cleansed_green_data = PipelineData(\"cleansed_green_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Cleanse script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# cleansing step creation\n",
        "# See the cleanse.py for details about input and output\n",
        "cleansingStepGreen = PythonScriptStep(\n",
        "    name=\"Cleanse Green Taxi Data\",\n",
        "    script_name=\"cleanse.py\", \n",
        "    arguments=[\"--useful_columns\", useful_columns,\n",
        "               \"--columns\", green_columns,\n",
        "               \"--output_cleanse\", cleansed_green_data],\n",
        "    inputs=[green_taxi_data.as_named_input('raw_data')],\n",
        "    outputs=[cleansed_green_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig=aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"cleansingStepGreen created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Cleanse Yellow taxi data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "yellow_columns = str({\n",
        "    \"vendorID\": \"vendor\",\n",
        "    \"tpepPickupDateTime\": \"pickup_datetime\",\n",
        "    \"tpepDropoffDateTime\": \"dropoff_datetime\",\n",
        "    \"storeAndFwdFlag\": \"store_forward\",\n",
        "    \"startLon\": \"pickup_longitude\",\n",
        "    \"startLat\": \"pickup_latitude\",\n",
        "    \"endLon\": \"dropoff_longitude\",\n",
        "    \"endLat\": \"dropoff_latitude\",\n",
        "    \"passengerCount\": \"passengers\",\n",
        "    \"fareAmount\": \"cost\",\n",
        "    \"tripDistance\": \"distance\"\n",
        "}).replace(\",\", \";\")\n",
        "\n",
        "# Define output after cleansing step\n",
        "cleansed_yellow_data = PipelineData(\"cleansed_yellow_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Cleanse script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# cleansing step creation\n",
        "# See the cleanse.py for details about input and output\n",
        "cleansingStepYellow = PythonScriptStep(\n",
        "    name=\"Cleanse Yellow Taxi Data\",\n",
        "    script_name=\"cleanse.py\", \n",
        "    arguments=[\"--useful_columns\", useful_columns,\n",
        "               \"--columns\", yellow_columns,\n",
        "               \"--output_cleanse\", cleansed_yellow_data],\n",
        "    inputs=[yellow_taxi_data.as_named_input('raw_data')],\n",
        "    outputs=[cleansed_yellow_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig=aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"cleansingStepYellow created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Merge cleansed Green and Yellow datasets\n",
        "We are creating a single data source by merging the cleansed versions of Green and Yellow taxi data."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define output after merging step\n",
        "merged_data = PipelineData(\"merged_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Merge script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# merging step creation\n",
        "# See the merge.py for details about input and output\n",
        "mergingStep = PythonScriptStep(\n",
        "    name=\"Merge Taxi Data\",\n",
        "    script_name=\"merge.py\", \n",
        "    arguments=[\"--output_merge\", merged_data],\n",
        "    inputs=[cleansed_green_data.parse_parquet_files(),\n",
        "            cleansed_yellow_data.parse_parquet_files()],\n",
        "    outputs=[merged_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig=aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"mergingStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Filter data\n",
        "This step filters out coordinates for locations that are outside the city border. We use a TypeConverter object to change the latitude and longitude fields to decimal type. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define output after merging step\n",
        "filtered_data = PipelineData(\"filtered_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Filter script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# filter step creation\n",
        "# See the filter.py for details about input and output\n",
        "filterStep = PythonScriptStep(\n",
        "    name=\"Filter Taxi Data\",\n",
        "    script_name=\"filter.py\", \n",
        "    arguments=[\"--output_filter\", filtered_data],\n",
        "    inputs=[merged_data.parse_parquet_files()],\n",
        "    outputs=[filtered_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig = aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"FilterStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Normalize data\n",
        "In this step, we split the pickup and dropoff datetime values into the respective date and time columns and then we rename the columns to use meaningful names."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define output after normalize step\n",
        "normalized_data = PipelineData(\"normalized_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Normalize script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# normalize step creation\n",
        "# See the normalize.py for details about input and output\n",
        "normalizeStep = PythonScriptStep(\n",
        "    name=\"Normalize Taxi Data\",\n",
        "    script_name=\"normalize.py\", \n",
        "    arguments=[\"--output_normalize\", normalized_data],\n",
        "    inputs=[filtered_data.parse_parquet_files()],\n",
        "    outputs=[normalized_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig = aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"normalizeStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Transform data\n",
        "Transform the normalized taxi data to final required format. This steps does the following:\n",
        "\n",
        "- Split the pickup and dropoff date further into the day of the week, day of the month, and month values. \n",
        "- After new features are generated, use the drop_columns() function to delete the original fields as the newly generated features are preferred. \n",
        "- Rename the rest of the fields to use meaningful descriptions."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define output after transform step\n",
        "transformed_data = PipelineData(\"transformed_data\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Transform script is in {}.'.format(os.path.realpath(prepare_data_folder)))\n",
        "\n",
        "# transform step creation\n",
        "# See the transform.py for details about input and output\n",
        "transformStep = PythonScriptStep(\n",
        "    name=\"Transform Taxi Data\",\n",
        "    script_name=\"transform.py\", \n",
        "    arguments=[\"--output_transform\", transformed_data],\n",
        "    inputs=[normalized_data.parse_parquet_files()],\n",
        "    outputs=[transformed_data],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig = aml_run_config,\n",
        "    source_directory=prepare_data_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"transformStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Split the data into train and test sets\n",
        "This function segregates the data into dataset for model training and dataset for testing."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "train_model_folder = './scripts/trainmodel'\n",
        "\n",
        "# train and test splits output\n",
        "output_split_train = PipelineData(\"output_split_train\", datastore=default_store, is_directory=True).as_dataset()\n",
        "output_split_test = PipelineData(\"output_split_test\", datastore=default_store, is_directory=True).as_dataset()\n",
        "\n",
        "print('Data spilt script is in {}.'.format(os.path.realpath(train_model_folder)))\n",
        "\n",
        "# test train split step creation\n",
        "# See the train_test_split.py for details about input and output\n",
        "testTrainSplitStep = PythonScriptStep(\n",
        "    name=\"Train Test Data Split\",\n",
        "    script_name=\"train_test_split.py\", \n",
        "    arguments=[\"--output_split_train\", output_split_train,\n",
        "               \"--output_split_test\", output_split_test],\n",
        "    inputs=[transformed_data.parse_parquet_files()],\n",
        "    outputs=[output_split_train, output_split_test],\n",
        "    compute_target=aml_compute,\n",
        "    runconfig = aml_run_config,\n",
        "    source_directory=train_model_folder,\n",
        "    allow_reuse=True\n",
        ")\n",
        "\n",
        "print(\"testTrainSplitStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Use automated machine learning to build regression model\n",
        "Now we will use **automated machine learning** to build the regression model. We will use [AutoMLStep](https://docs.microsoft.com/python/api/azureml-train-automl-runtime/azureml.train.automl.runtime.automl_step.automlstep?view=azure-ml-py) in AML Pipelines for this part. Perform `pip install azureml-sdk[automl]`to get the automated machine learning package. These functions use various features from the data set and allow an automated model to build relationships between the features and the price of a taxi trip."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Automatically train a model"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Create experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Experiment\n",
        "\n",
        "experiment = Experiment(ws, 'NYCTaxi_Tutorial_Pipelines')\n",
        "\n",
        "print(\"Experiment created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define settings for autogeneration and tuning\n",
        "\n",
        "Here we define the experiment parameter and model settings for autogeneration and tuning. We can specify automl_settings as **kwargs as well.\n",
        "\n",
        "Use your defined training settings as a parameter to an `AutoMLConfig` object. Additionally, specify your training data and the type of model, which is `regression` in this case.\n",
        "\n",
        "Note: When using AmlCompute, we can't pass Numpy arrays directly to the fit method."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.train.automl import AutoMLConfig\n",
        "\n",
        "# Change iterations to a reasonable number (50) to get better accuracy\n",
        "automl_settings = {\n",
        "    \"iteration_timeout_minutes\" : 10,\n",
        "    \"iterations\" : 2,\n",
        "    \"primary_metric\" : 'spearman_correlation',\n",
        "    \"n_cross_validations\": 5\n",
        "}\n",
        "\n",
        "training_dataset = output_split_train.parse_parquet_files().keep_columns(['pickup_weekday','pickup_hour', 'distance','passengers', 'vendor', 'cost'])\n",
        "\n",
        "automl_config = AutoMLConfig(task = 'regression',\n",
        "                             debug_log = 'automated_ml_errors.log',\n",
        "                             path = train_model_folder,\n",
        "                             compute_target = aml_compute,\n",
        "                             featurization = 'auto',\n",
        "                             training_data = training_dataset,\n",
        "                             label_column_name = 'cost',\n",
        "                             **automl_settings)\n",
        "                             \n",
        "print(\"AutoML config created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define AutoMLStep"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.steps import AutoMLStep\n",
        "\n",
        "trainWithAutomlStep = AutoMLStep(name='AutoML_Regression',\n",
        "                                 automl_config=automl_config,\n",
        "                                 allow_reuse=True)\n",
        "print(\"trainWithAutomlStep created.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build and run the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Pipeline\n",
        "from azureml.widgets import RunDetails\n",
        "\n",
        "pipeline_steps = [trainWithAutomlStep]\n",
        "\n",
        "pipeline = Pipeline(workspace = ws, steps=pipeline_steps)\n",
        "print(\"Pipeline is built.\")\n",
        "\n",
        "pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)\n",
        "\n",
        "print(\"Pipeline submitted for execution.\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Explore the results"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Before we proceed we need to wait for the run to complete.\n",
        "pipeline_run.wait_for_completion(show_output=False)\n",
        "\n",
        "# functions to download output to local and fetch as dataframe\n",
        "def get_download_path(download_path, output_name):\n",
        "    output_folder = os.listdir(download_path + '/azureml')[0]\n",
        "    path =  download_path + '/azureml/' + output_folder + '/' + output_name\n",
        "    return path\n",
        "\n",
        "def fetch_df(current_step, output_name):\n",
        "    output_data = current_step.get_output_data(output_name)    \n",
        "    download_path = './outputs/' + output_name\n",
        "    output_data.download(download_path, overwrite=True)\n",
        "    df_path = get_download_path(download_path, output_name) + '/processed.parquet'\n",
        "    return pd.read_parquet(df_path)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View cleansed taxi data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "green_cleanse_step = pipeline_run.find_step_run(cleansingStepGreen.name)[0]\n",
        "yellow_cleanse_step = pipeline_run.find_step_run(cleansingStepYellow.name)[0]\n",
        "\n",
        "cleansed_green_df = fetch_df(green_cleanse_step, cleansed_green_data.name)\n",
        "cleansed_yellow_df = fetch_df(yellow_cleanse_step, cleansed_yellow_data.name)\n",
        "\n",
        "display(cleansed_green_df.head(5))\n",
        "display(cleansed_yellow_df.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View the combined taxi data profile"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "merge_step = pipeline_run.find_step_run(mergingStep.name)[0]\n",
        "combined_df = fetch_df(merge_step, merged_data.name)\n",
        "\n",
        "display(combined_df.describe())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View the filtered taxi data profile"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "filter_step = pipeline_run.find_step_run(filterStep.name)[0]\n",
        "filtered_df = fetch_df(filter_step, filtered_data.name)\n",
        "\n",
        "display(filtered_df.describe())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View normalized taxi data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "normalize_step = pipeline_run.find_step_run(normalizeStep.name)[0]\n",
        "normalized_df = fetch_df(normalize_step, normalized_data.name)\n",
        "\n",
        "display(normalized_df.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View transformed taxi data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "transform_step = pipeline_run.find_step_run(transformStep.name)[0]\n",
        "transformed_df = fetch_df(transform_step, transformed_data.name)\n",
        "\n",
        "display(transformed_df.describe())\n",
        "display(transformed_df.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View training data used by AutoML"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "split_step = pipeline_run.find_step_run(testTrainSplitStep.name)[0]\n",
        "train_split = fetch_df(split_step, output_split_train.name)\n",
        "\n",
        "display(train_split.describe())\n",
        "display(train_split.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View the details of the AutoML run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.train.automl.run import AutoMLRun\n",
        "#from azureml.widgets import RunDetails\n",
        "\n",
        "# workaround to get the automl run as its the last step in the pipeline \n",
        "# and get_steps() returns the steps from latest to first\n",
        "\n",
        "for step in pipeline_run.get_steps():\n",
        "    automl_step_run_id = step.id\n",
        "    print(step.name)\n",
        "    print(automl_step_run_id)\n",
        "    break\n",
        "\n",
        "automl_run = AutoMLRun(experiment = experiment, run_id=automl_step_run_id)\n",
        "#RunDetails(automl_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Retreive the best model\n",
        "\n",
        "Uncomment the below cell to retrieve the best model"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# best_run, fitted_model = automl_run.get_output()\n",
        "# print(best_run)\n",
        "# print(fitted_model)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Test the model"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Get test data\n",
        "\n",
        "Uncomment the below cell to get test data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# split_step = pipeline_run.find_step_run(testTrainSplitStep.name)[0]\n",
        "\n",
        "# x_test = fetch_df(split_step, output_split_test.name)[['distance','passengers', 'vendor','pickup_weekday','pickup_hour']]\n",
        "# y_test = fetch_df(split_step, output_split_test.name)[['cost']]\n",
        "\n",
        "# display(x_test.head(5))\n",
        "# display(y_test.head(5))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Test the best fitted model\n",
        "\n",
        "Uncomment the below cell to test the best fitted model"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# y_predict = fitted_model.predict(x_test)\n",
        "\n",
        "# y_actual =  y_test.values.tolist()\n",
        "\n",
        "# display(pd.DataFrame({'Actual':y_actual, 'Predicted':y_predict}).head(5))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# import matplotlib.pyplot as plt\n",
        "\n",
        "# fig = plt.figure(figsize=(14, 10))\n",
        "# ax1 = fig.add_subplot(111)\n",
        "\n",
        "# distance_vals = [x[0] for x in x_test.values]\n",
        "\n",
        "# ax1.scatter(distance_vals[:100], y_predict[:100], s=18, c='b', marker=\"s\", label='Predicted')\n",
        "# ax1.scatter(distance_vals[:100], y_actual[:100], s=18, c='r', marker=\"o\", label='Actual')\n",
        "\n",
        "# ax1.set_xlabel('distance (mi)')\n",
        "# ax1.set_title('Predicted and Actual Cost/Distance')\n",
        "# ax1.set_ylabel('Cost ($)')\n",
        "\n",
        "# plt.legend(loc='upper left', prop={'size': 12})\n",
        "# plt.rcParams.update({'font.size': 14})\n",
        "# plt.show()"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "anshirga"
      }
    ],
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 2
}